feat: pipeline parallelism optimizations - load balancing, 1F1B scheduling, activation checkpointing#845
feat: pipeline parallelism optimizations - load balancing, 1F1B scheduling, activation checkpointing#845
Conversation
…cheduling, activation checkpointing (#463) - Add IPipelinePartitionStrategy interface and UniformPartitionStrategy (default) - Add LoadBalancedPartitionStrategy using dynamic programming min-max partitioning - Add IPipelineSchedule interface with GPipeSchedule and OneForwardOneBackwardSchedule (1F1B) - Add ActivationCheckpointConfig with configurable checkpoint frequency and recompute strategies - Integrate all three optimizations into PipelineParallelModel with backward compatibility - 1F1B schedule reduces pipeline bubble from ~50% to ~12-15% - Activation checkpointing reduces memory from O(L) to O(sqrt(L)) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds a production-grade pipeline-parallel subsystem: new scheduling API and multiple schedule implementations (GPipe, 1F1B, ZB-H1/H2, Interleaved, Looped-BFS, ZB-V), partition strategies (uniform, load-balanced), activation checkpoint config and recompute strategy, and integrates these into PipelineParallelModel and builder interfaces for schedule-driven, micro-batch-aware training. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Scheduler
participant Stage0 as "Rank A: Stage 0"
participant Stage1 as "Rank B: Stage 1"
participant Stage2 as "Rank C: Stage 2"
Client->>Scheduler: Request schedule (P=3, M=4)
Scheduler->>Scheduler: Generate 1F1B schedule (warmup, steady, cooldown)
rect rgba(100,150,255,0.5)
Note over Scheduler: Warmup
Scheduler->>Stage0: Forward(m=0)
Stage0->>Stage1: Send activations(m=0)
Scheduler->>Stage1: Forward(m=0)
Stage1->>Stage2: Send activations(m=0)
end
rect rgba(150,200,100,0.5)
Note over Scheduler: Steady (interleaved)
Scheduler->>Stage0: Forward(m=1)
Stage0->>Stage1: Send activations(m=1)
Scheduler->>Stage2: Backward(m=0)
Stage2->>Stage1: Send gradients(m=0)
Stage1->>Stage0: Send gradients(m=0)
end
rect rgba(200,150,100,0.5)
Note over Scheduler: Cooldown
Scheduler->>Stage2: Backward(m=3)
Stage2->>Stage1: Send gradients(m=3)
end
sequenceDiagram
participant Model
participant ActivationCache
participant CheckpointStore
participant GradAcc as GradientAccumulator
Model->>ActivationCache: Store activation A_i
alt ShouldCheckpointActivation == true
ActivationCache->>CheckpointStore: Persist checkpoint A_i
else
ActivationCache->>ActivationCache: Keep in-memory A_i
end
Model->>Model: Backward(m)
alt Needed activation not in-memory
CheckpointStore->>Model: Recompute activation(s) from checkpoint
else
ActivationCache->>Model: Retrieve activation(s)
end
Model->>GradAcc: Accumulate ∇W
GradAcc->>Model: Apply averaged gradients after accumulation
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Blocking notes (code quality / production-readiness)
Poem
🚥 Pre-merge checks | ✅ 5 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Pull request overview
This PR implements three major optimizations for pipeline parallel training as described in issue #463: load-balanced layer partitioning, 1F1B micro-batch scheduling, and activation checkpointing. While the architectural design and interfaces are well-conceived, the implementation contains several critical bugs that prevent the features from working correctly, particularly around activation checkpointing and gradient communication.
Changes:
- Adds extensible scheduling infrastructure with IPipelineSchedule interface and two implementations (GPipe, 1F1B)
- Adds partitioning strategies via IPipelinePartitionStrategy with uniform and load-balanced implementations
- Adds activation checkpointing configuration framework (though implementation is incomplete/broken)
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
src/Interfaces/IPipelineSchedule.cs |
Defines scheduling strategy interface for ordering forward/backward passes with warmup/cooldown phases |
src/Interfaces/IPipelinePartitionStrategy.cs |
Defines partitioning strategy interface for distributing model parameters across pipeline stages |
src/DistributedTraining/UniformPartitionStrategy.cs |
Implements simple equal-sized parameter partitioning (original default behavior) |
src/DistributedTraining/LoadBalancedPartitionStrategy.cs |
Implements dynamic programming-based cost-balanced partitioning with estimated computational costs |
src/DistributedTraining/GPipeSchedule.cs |
Implements all-forward-then-all-backward scheduling (synchronous pipeline) |
src/DistributedTraining/OneForwardOneBackwardSchedule.cs |
Implements interleaved 1F1B scheduling with warmup/steady-state/cooldown phases |
src/DistributedTraining/ActivationCheckpointConfig.cs |
Configuration class for activation checkpointing with frequency and recompute strategy options |
src/DistributedTraining/PipelineParallelModel.cs |
Integrates all three optimizations with schedule-driven execution loop and checkpointing hooks |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 8
🤖 Fix all issues with AI agents
In `@src/DistributedTraining/ActivationCheckpointConfig.cs`:
- Around line 52-75: The CheckpointEveryNLayers and MaxActivationsInMemory
properties lack validation and can lead to division-by-zero or invalid negative
values; add input checks in ActivationCheckpointConfig: enforce
CheckpointEveryNLayers > 0 (unless a special mode explicitly allows 0—if so
document and guard usage) and enforce MaxActivationsInMemory >= 0. Implement
this by adding validation logic (either in property setters for
CheckpointEveryNLayers and MaxActivationsInMemory, or a public Validate() method
on ActivationCheckpointConfig that throws
ArgumentException/ArgumentOutOfRangeException with clear messages) and ensure
callers construct/validate instances (e.g., call Validate() from the constructor
or factory) so invalid configs fail fast.
In `@src/DistributedTraining/GPipeSchedule.cs`:
- Around line 41-57: In GetSchedule, validate numStages (and numMicroBatches)
before checking stageId so you don't throw ArgumentOutOfRangeException for
stageId when numStages is invalid; move the numStages <= 0 check (and
numMicroBatches <= 0 if desired) above the stageId bounds check in the
GetSchedule method and keep the stageId validation afterwards, throwing
ArgumentException for numStages and ArgumentOutOfRangeException for an invalid
stageId.
In `@src/DistributedTraining/LoadBalancedPartitionStrategy.cs`:
- Around line 55-141: The constructor and BuildLayerSizes currently conflate a
single-element _layerBoundaries array with "auto-detect" behavior and do not
validate ordering/ranges; to fix, change the int[] ctor
(LoadBalancedPartitionStrategy(int[] layerBoundaries,...)) to validate that
layerBoundaries is non-null, has length >=1, contains strictly increasing,
non-negative values and that the last boundary < totalParameters (or at least
document/validate later), and throw ArgumentException for invalid input; keep
auto-detect behavior only for the other ctor LoadBalancedPartitionStrategy(int
estimatedLayerSize,...) by adding a private flag like _isAutoDetect (set in the
estimatedLayerSize ctor) and have BuildLayerSizes check _isAutoDetect (not
_layerBoundaries.Length==1) to generate synthetic layers, and in BuildLayerSizes
also validate boundaries are sorted and within range and compute sizes using
consecutive boundary differences so parameters aren’t silently dropped.
In `@src/DistributedTraining/PipelineParallelModel.cs`:
- Around line 216-224: The checkpointing implementation is incomplete and leaves
dead state; either remove the partial logic or make it fail-fast. Update the
code paths that reference RecomputeStrategy and CheckpointFirstLayer so that if
any checkpointing mode is selected the PipelineParallelModel throws a clear
NotImplementedException (or ArgumentException) at construction or before the
forward loop; remove or stop populating unused microBatchOutputs and only keep
microBatchInputs and _checkpointedActivations if they are actively used,
otherwise delete those fields and related writes to eliminate dead state; ensure
any remaining checkpoint-related flags are documented and guarded so no silent
partial behavior runs in production.
- Around line 226-243: The loop is reusing the same input/output for every
micro‑batch which corrupts gradients when _microBatchSize > 1; update the code
that builds microBatchInputs/microBatchOutputs and checkpointing to index into
the per‑microbatch collections using op.MicroBatchIndex (i.e., obtain stageInput
= <microbatch-list>[op.MicroBatchIndex] or call a proper slice helper instead of
reusing the top‑level input), store checkpointed activation into
_checkpointedActivations[op.MicroBatchIndex], and similarly ensure the
expectedOutput/loss lookup uses expectedOutputList[op.MicroBatchIndex] (or fail
fast if only scalar inputs are supported). Locate and fix references around
GetStageInput, ShouldCheckpointActivation, _checkpointedActivations,
microBatchInputs, WrappedModel.Predict, microBatchOutputs and the expectedOutput
usage so every microbatch uses its own indexed input/output.
- Around line 87-139: Public constructor and properties (PipelineParallelModel,
Schedule, PartitionStrategy, CheckpointConfig, CheckpointConfig) expose knobs
that bypass the intended facade; either make PipelineParallelModel internal and
keep the public surface via AiModelBuilder/AiModelResult, or keep the class
public but make those properties/constructor overloads internal/private and add
corresponding configuration entry points on AiModelBuilder that set
schedule/partition/checkpoint before building. Update visibility for the
constructor and/or Schedule/PartitionStrategy/CheckpointConfig properties or
move construction logic behind AiModelBuilder methods (e.g.,
AddPipelineSchedule, WithPartitionStrategy, WithCheckpointConfig) so external
users only interact through AiModelBuilder/AiModelResult.
- Around line 246-283: The tag math currently uses overlapping namespaces
(SendActivationsForward using tag: op.MicroBatchIndex * 10 and gradient
Send/Receive using tag: 1000 + op.MicroBatchIndex), which can collide for large
microBatchIndex; introduce dedicated constants (e.g., ACTIVATION_TAG_BASE and
GRADIENT_TAG_BASE or ACTIVATION_TAG_MULTIPLIER and GRADIENT_TAG_BASE) and
replace occurrences in SendActivationsForward, the gradient Send/Receive calls
(where tag is 1000 + op.MicroBatchIndex) and the other region mentioned (lines
~340-370) so activations use ACTIVATION_TAG_BASE + op.MicroBatchIndex and
gradients use GRADIENT_TAG_BASE + op.MicroBatchIndex (or multiply
microBatchIndex by a large non-overlapping multiplier) to guarantee
non‑overlapping tag ranges.
- Around line 171-177: The code uses
_partitionStrategy.ComputePartition(totalParams, _numStages) and immediately
indexes partitions[_stageId]; validate the returned partitions before indexing
by checking partitions is not null, partitions.Length == _numStages, and that
the entry for partitions[_stageId] has non‑negative StartIndex and Size and that
StartIndex + Size <= totalParams; if any check fails, throw an informative
exception (or fall back to a safe default partitioning) rather than proceeding
to assign ShardStartIndex and ShardSize from an invalid partition.
| // This is approximately half of GPipe's bubble for large M | ||
| int p = numStages; | ||
| int m = numMicroBatches; | ||
| return (double)(p - 1) / (2 * m + p - 1); |
Check failure
Code scanning / CodeQL
Possible loss of precision Error
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 hour ago
In general, to avoid loss of precision or overflow when converting the result of integer arithmetic to a floating-point type, ensure that multiplication and/or division are performed in floating-point arithmetic by casting at least one operand to a floating type before the operation. This prevents intermediate integer overflow and maintains the intended mathematical result.
Here, the best fix is to make the denominator calculation occur in double instead of long. We can achieve this by casting one of the operands to double (for example, 2.0 * m or (double)(2 * m + p - 1)) before the multiplication or addition, so the entire expression is evaluated as double. To be explicit and avoid any integer overflow before the cast, we should introduce the double cast at the earliest operation in the denominator, e.g., 2.0 * m, ensuring that 2.0 * m + p - 1 is all in double space.
Concretely, in src/DistributedTraining/OneForwardOneBackwardSchedule.cs, in the EstimateBubbleFraction method, change the return statement at line 147 from:
return (double)(p - 1) / (2 * m + p - 1);to a version where the denominator is computed as double from the start, such as:
return (double)(p - 1) / (2.0 * m + p - 1);This keeps the logic identical while eliminating the potential long overflow and the associated CodeQL warning. No new methods or imports are required.
| @@ -144,6 +144,6 @@ | ||
| // This is approximately half of GPipe's bubble for large M | ||
| long p = numStages; | ||
| long m = numMicroBatches; | ||
| return (double)(p - 1) / (2 * m + p - 1); | ||
| return (double)(p - 1) / (2.0 * m + p - 1); | ||
| } | ||
| } |
| { | ||
| int numLayers = layerSizes.Length; | ||
|
|
||
| if (numStages >= numLayers) | ||
| { | ||
| // More stages than layers: assign one layer per stage, remaining stages get empty shards | ||
| return AssignOneLayerPerStage(layerSizes, numStages); | ||
| } | ||
|
|
||
| // Prefix sums for parameter sizes and costs | ||
| var paramPrefix = new long[numLayers + 1]; | ||
| var costPrefix = new double[numLayers + 1]; | ||
|
|
||
| for (int i = 0; i < numLayers; i++) | ||
| { | ||
| paramPrefix[i + 1] = paramPrefix[i] + layerSizes[i]; | ||
| costPrefix[i + 1] = costPrefix[i] + layerCosts[i]; | ||
| } | ||
|
|
||
| // dp[s][l] = minimum of maximum stage cost when assigning layers 0..l-1 to stages 0..s-1 | ||
| var dp = new double[numStages + 1][]; | ||
| var splitPoint = new int[numStages + 1][]; | ||
|
|
||
| for (int s = 0; s <= numStages; s++) | ||
| { | ||
| dp[s] = new double[numLayers + 1]; | ||
| splitPoint[s] = new int[numLayers + 1]; | ||
| for (int i = 0; i < dp[s].Length; i++) | ||
| { | ||
| dp[s][i] = double.MaxValue; | ||
| } | ||
| } | ||
|
|
||
| dp[0][0] = 0.0; | ||
|
|
||
| // Base case: one stage gets all layers up to l | ||
| for (int l = 1; l <= numLayers; l++) | ||
| { | ||
| dp[1][l] = costPrefix[l]; | ||
| splitPoint[1][l] = 0; | ||
| } | ||
|
|
||
| // Fill DP table | ||
| for (int s = 2; s <= numStages; s++) | ||
| { | ||
| for (int l = s; l <= numLayers; l++) | ||
| { | ||
| // Try all possible split points for the last stage | ||
| for (int k = s - 1; k < l; k++) | ||
| { | ||
| double lastStageCost = costPrefix[l] - costPrefix[k]; | ||
| double candidate = Math.Max(dp[s - 1][k], lastStageCost); | ||
|
|
||
| if (candidate < dp[s][l]) | ||
| { | ||
| dp[s][l] = candidate; | ||
| splitPoint[s][l] = k; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Backtrack to find optimal partition | ||
| var stageEndLayers = new int[numStages]; | ||
| int currentLayer = numLayers; | ||
|
|
||
| for (int s = numStages; s >= 1; s--) | ||
| { | ||
| stageEndLayers[s - 1] = currentLayer; | ||
| currentLayer = splitPoint[s][currentLayer]; | ||
| } | ||
|
|
||
| // Convert layer assignments to parameter partitions | ||
| var partitions = new (int StartIndex, int Size)[numStages]; | ||
| int layerStart = 0; | ||
|
|
||
| for (int s = 0; s < numStages; s++) | ||
| { | ||
| int layerEnd = stageEndLayers[s]; | ||
| int paramStart = (int)paramPrefix[layerStart]; | ||
| int paramSize = (int)(paramPrefix[layerEnd] - paramPrefix[layerStart]); | ||
| partitions[s] = (paramStart, paramSize); | ||
| layerStart = layerEnd; | ||
| } | ||
|
|
||
| return partitions; | ||
| } |
Check notice
Code scanning / CodeQL
Block with too many statements Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 4 hours ago
In general, to fix a "block with too many statements" issue, you break the large method into smaller, focused helper methods, each handling a distinct logical part of the algorithm. This reduces the number of complex statements (loops/branches) per block, improves readability, and keeps behavior identical by preserving the same data flow and parameters.
For OptimalPartition, the main logical phases are: (1) handle the trivial case where numStages >= numLayers; (2) build prefix sums; (3) allocate and initialize the DP and split‑point tables; (4) handle the base case for a single stage; (5) run the nested loops that fill the DP; (6) backtrack to compute layer boundaries; and (7) translate the layer boundaries into parameter partitions. The single best way to refactor without changing behavior is to extract these phases into private helper methods that receive and return only the data they need, while leaving the overall algorithm in OptimalPartition intact.
Concretely, within src/DistributedTraining/LoadBalancedPartitionStrategy.cs, in the body of OptimalPartition, we will:
- Keep the early exit when
numStages >= numLayersas is. - Extract prefix‑sum computation into
ComputePrefixSums. - Extract DP table allocation/initialization into
InitializeDpTables. - Extract filling of base case into
InitializeBaseCase. - Extract the nested DP loops into
FillDpTable. - Extract the backtracking loop into
BacktrackStageEndLayers. - Extract the final conversion to parameter partitions into
BuildPartitionsFromLayers.
These will be added as private methods within the same class, after OptimalPartition or near it, using only existing types (int[], double[], long[], int[][], double[][]) and no new imports. The body of OptimalPartition will be rewritten to compose these helpers, reducing its internal complexity while preserving all functionality.
ConfigureDistributedTraining() now accepts optional pipeline-specific parameters (schedule, partition strategy, checkpoint config, micro-batch size) that are passed through to PipelineParallelModel when the user selects DistributedStrategy.PipelineParallel. All parameters are optional with backward-compatible defaults. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/AiModelBuilder.cs (1)
3703-3771:⚠️ Potential issue | 🔴 CriticalBlocking: validate
pipelineMicroBatchSizebefore storing it.This is a public API input; non-positive values can cause invalid scheduling or runtime errors. Fail fast with an explicit guard.
✅ Suggested fix
public IAiModelBuilder<T, TInput, TOutput> ConfigureDistributedTraining( ICommunicationBackend<T>? backend = null, DistributedStrategy strategy = DistributedStrategy.DDP, IShardingConfiguration<T>? configuration = null, IPipelineSchedule? pipelineSchedule = null, IPipelinePartitionStrategy<T>? pipelinePartitionStrategy = null, ActivationCheckpointConfig? pipelineCheckpointConfig = null, int pipelineMicroBatchSize = 1) { + if (strategy == DistributedStrategy.PipelineParallel && pipelineMicroBatchSize <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(pipelineMicroBatchSize), + "Pipeline micro-batch size must be >= 1."); + } _distributedBackend = backend; _distributedStrategy = strategy; _distributedConfiguration = configuration; _pipelineSchedule = pipelineSchedule; _pipelinePartitionStrategy = pipelinePartitionStrategy; _pipelineCheckpointConfig = pipelineCheckpointConfig; _pipelineMicroBatchSize = pipelineMicroBatchSize; return this; }As per coding guidelines: “Production Readiness (CRITICAL - Flag as BLOCKING) … missing validation of external inputs.”
🤖 Fix all issues with AI agents
In `@src/Interfaces/IAiModelBuilder.cs`:
- Around line 769-781: The change to the public interface
IAiModelBuilder<T,TInput,TOutput>.ConfigureDistributedTraining alters its
signature and will break external implementers; restore the original interface
method signature (keep ConfigureDistributedTraining as it was) and move the new
pipeline-specific parameters into a non-breaking alternative such as: add an
overload on the concrete AiModelBuilder class or introduce a
PipelineDistributedOptions/DistributedTrainingOptions object that the facade
AiModelBuilder exposes (or add a new method ConfigurePipelineDistributedTraining
on AiModelBuilder) so external implementations of IAiModelBuilder are unaffected
while still supporting pipelineSchedule, pipelinePartitionStrategy,
pipelineCheckpointConfig and pipelineMicroBatchSize.
…d decomposition Add 5 new pipeline schedule implementations based on 2024-2025 research: - ZB-H1: splits backward into B+W, ~1/3 bubble of 1F1B (same memory) - ZB-H2: aggressive scheduling for zero bubble (higher memory) - ZB-V: 2 virtual stages per rank, zero bubble with 1F1B memory - Interleaved 1F1B: V virtual stages per rank, depth-first ordering - Looped BFS: V virtual stages per rank, breadth-first ordering Expand IPipelineSchedule with VirtualStagesPerRank and BackwardInput/ BackwardWeight operation types. Update PipelineParallelModel to handle split backward passes with cached input gradients. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 12
🤖 Fix all issues with AI agents
In `@src/DistributedTraining/Interleaved1F1BSchedule.cs`:
- Around line 95-97: In Interleaved1F1BSchedule.cs, remove the redundant "/ 1"
from the computation of numWarmupForwards (currently: int numWarmupForwards =
Math.Min((totalVirtualStages - 1 - stageId) / 1, numMicroBatches *
_virtualStagesPerRank);); change the left-side expression to simply
(totalVirtualStages - 1 - stageId) so Math.Min compares that value directly with
numMicroBatches * _virtualStagesPerRank, preserving behavior but eliminating the
no-op division.
- Around line 70-84: The validation currently checks stageId before ensuring
numStages is positive; update the checks in Interleaved1F1BSchedule (the
constructor or validation block that references stageId, numStages, and
numMicroBatches) to validate numStages > 0 first, then validate that stageId is
within [0, numStages-1], and keep the numMicroBatches > 0 check as is; reorder
the checks so ArgumentException for numStages comes before the
ArgumentOutOfRangeException for stageId.
- Around line 103-104: Remove the dead tracking arrays forwardCount and
backwardCount from the Interleaved1F1BSchedule class: delete their declarations
(var forwardCount/backwardCount) and remove all statements that mutate them
(increments/assignments where forwardCount[...]++ or backwardCount[...]++
appear), since their values are never read; ensure no other code references
these identifiers and run tests/compile to confirm no remaining references.
In `@src/DistributedTraining/LoopedBFSSchedule.cs`:
- Around line 73-87: The validation for stageId is performed before ensuring
numStages is valid in LoopedBFSSchedule (constructor or validation block); move
the check "if (numStages <= 0) { throw new ArgumentException(...,
nameof(numStages)); }" to run before the "if (stageId < 0 || stageId >=
numStages) { throw new ArgumentOutOfRangeException(nameof(stageId), ...); }"
check so stageId comparisons are only done when numStages is positive, and keep
the existing validation for numMicroBatches as-is (numMicroBatches <= 0).
- Around line 105-106: In LoopedBFSSchedule.cs inside the method computing
vStage, remove the two dead local variables isFirstLoop and isLastLoop (they are
declared as bool isFirstLoop = vStage == 0; and bool isLastLoop = vStage ==
_virtualStagesPerRank - 1;) since they are never used; alternatively, if special
warmup/cooldown logic was intended, replace their declarations with the actual
handling logic referencing vStage and _virtualStagesPerRank, but do not leave
unused locals behind.
In `@src/DistributedTraining/OneForwardOneBackwardSchedule.cs`:
- Around line 52-66: The validation currently checks stageId bounds before
ensuring numStages is positive; in OneForwardOneBackwardSchedule
(constructor/initializer) move the numStages <= 0 check to run before the
stageId range check so you validate the container size first, then verify
stageId is within 0..numStages-1; keep the numMicroBatches <= 0 check as-is and
preserve the same exception types/messages.
In `@src/DistributedTraining/PipelineParallelModel.cs`:
- Around line 209-210: The schedule returned by _schedule.GetSchedule(_stageId,
_numStages, _microBatchSize) may include invalid micro-batch indices; before
executing scheduleOps, validate every op in scheduleOps (use the existing
scheduleOps variable and the types it contains) to ensure any micro-batch index
field is within [0, _microBatchSize - 1] (and optionally that any target
stage/index fields are within valid stage range 0.._numStages-1); if any entry
is out of bounds, throw an ArgumentException or similar with a clear message
identifying the offending op and the expected bounds so invalid
externally-injected IPipelineSchedule implementations fail fast.
In `@src/DistributedTraining/ZeroBubbleH1Schedule.cs`:
- Around line 112-122: Remove the redundant "- 0" from the conditional in
ZeroBubbleH1Schedule: the check currently uses "backwardInputIdx - 0", which is
a no-op; update the if condition in the block that adds a new PipelineOperation
(the variables backwardWeightIdx, backwardInputIdx, and numMicroBatches, and the
creation of a PipelineOperation with Type =
PipelineOperationType.BackwardWeight) to compare backwardWeightIdx directly
against backwardInputIdx (i.e., use "backwardWeightIdx < backwardInputIdx &&
backwardWeightIdx < numMicroBatches") so the logic remains unchanged but the
code is cleaned up.
- Around line 39-53: The validation currently checks stageId bounds before
verifying numStages is positive; move the numStages check (the throw for
numStages <= 0) to run before the stageId range check so that stageId is not
compared against an invalid numStages, and keep the existing numMicroBatches > 0
check as-is; update the validation order in the ZeroBubbleH1Schedule
constructor/method (references: stageId, numStages, numMicroBatches) and apply
the same reordering to the other schedule implementations that use the same
checks.
In `@src/DistributedTraining/ZeroBubbleH2Schedule.cs`:
- Around line 37-51: The validation currently checks stageId before numStages
which can produce misleading errors; in the ZeroBubbleH2Schedule constructor (or
the method that validates inputs), move the check for numStages (numStages <= 0)
to run before the stageId range check, then keep the stageId check (stageId < 0
|| stageId >= numStages) and the numMicroBatches check (numMicroBatches <= 0)
as-is so errors are accurate and deterministic.
In `@src/DistributedTraining/ZeroBubbleVSchedule.cs`:
- Around line 1-263: Extract the repeated parameter checks in
ZeroBubbleVSchedule.GetSchedule into a shared validation helper and call it from
GetSchedule: move the three checks (numStages > 0, stageId in [0,numStages-1],
numMicroBatches > 0) into a new internal static
ScheduleValidation.ValidateGetScheduleParameters(int stageId, int numStages, int
numMicroBatches) and replace the inline checks at the top of
ZeroBubbleVSchedule.GetSchedule with a single call to that helper; apply the
same change to the other schedule classes so all seven schedules use the common
ScheduleValidation helper to remove duplication and ensure consistency.
- Around line 51-65: The code in ZeroBubbleVSchedule.cs validates stageId before
checking numStages, which can throw an ArgumentOutOfRangeException when
numStages is invalid; reverse the checks to validate numStages and
numMicroBatches first, then validate stageId. Extract a shared helper method
(e.g., ValidateScheduleParameters or ValidateStageParams) that takes numStages,
numMicroBatches and stageId and performs: (1) numStages > 0, (2) numMicroBatches
> 0, (3) 0 <= stageId < numStages, then call that helper from
ZeroBubbleVSchedule (and other schedule implementations) to ensure consistent
validation across the codebase.
| // ZB-H1 bubble: ~(P-1) / (3*M + P - 1) | ||
| int p = numStages; | ||
| int m = numMicroBatches; | ||
| return (double)(p - 1) / (3 * m + p - 1); |
Check failure
Code scanning / CodeQL
Possible loss of precision Error
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 hour ago
In general, to avoid overflow in integer multiplication when the result is used as a floating-point value, one operand should be cast to a floating-point type before the multiplication. This forces the computation to occur in floating-point arithmetic, which has a much larger dynamic range than the integer type, avoiding integer overflow (though still subject to floating-point range and precision limits).
In this specific case, the denominator 3 * m + p - 1 is computed entirely in long before being cast to double by the division. To avoid overflow while preserving the intended formula, we should ensure that the multiplication 3 * m is carried out in double. The simplest and most localized fix is to cast one of the operands to double in the denominator, e.g., 3.0 * m or (double)m * 3. This changes only the intermediate arithmetic type, not the mathematical expression or the final signature.
Concretely, in src/DistributedTraining/ZeroBubbleH1Schedule.cs, within EstimateBubbleFraction, change line 167 from:
return (double)(p - 1) / (3 * m + p - 1);to:
return (double)(p - 1) / (3.0 * m + p - 1);No additional imports or helper methods are required. The only change is the literal 3 to 3.0, which promotes the expression to double arithmetic, preventing any long overflow in 3 * m.
| @@ -164,6 +164,6 @@ | ||
| // ZB-H1 bubble: ~(P-1) / (3*M + P - 1) | ||
| long p = numStages; | ||
| long m = numMicroBatches; | ||
| return (double)(p - 1) / (3 * m + p - 1); | ||
| return (double)(p - 1) / (3.0 * m + p - 1); | ||
| } | ||
| } |
| int p = numStages; | ||
| int m = numMicroBatches; | ||
| int v = _virtualStagesPerRank; | ||
| return (double)(p - 1) / (2 * m * v + p - 1); |
Check failure
Code scanning / CodeQL
Possible loss of precision Error
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 hour ago
In general, to avoid integer overflow before conversion to floating point, at least one operand in the arithmetic expression should be a floating-point value so the computation is done in floating point rather than in an integral type that can overflow.
Here, the best fix is to perform the denominator calculation in double (or double-compatible) arithmetic from the outset. We can do this by casting one of the operands in the denominator expression to double, for example (double)(2 * m) * v + p - 1 still risks overflow in 2 * m, so the cast needs to happen before any potentially large integer multiplication. A simple and clear option is to cast one of the first operands, e.g. 2.0 * m * v + p - 1 or 2 * (double)m * v + p - 1. This ensures no intermediate integral multiplication can overflow, and it preserves the existing functionality (same mathematical formula, but safe evaluation). The change is localized to the EstimateBubbleFraction method in src/DistributedTraining/LoopedBFSSchedule.cs, at line 184; no new imports or additional methods are needed.
| @@ -181,6 +181,6 @@ | ||
| long p = numStages; | ||
| long m = numMicroBatches; | ||
| long v = _virtualStagesPerRank; | ||
| return (double)(p - 1) / (2 * m * v + p - 1); | ||
| return (double)(p - 1) / (2.0 * m * v + p - 1); | ||
| } | ||
| } |
…s, micro-batch slicing, and checkpoint recomputation - Add IPipelineDecomposableModel<T> interface for true B/W split (BackwardInput/BackwardWeight) - Emulated B/W split fallback when model doesn't implement decomposition - Virtual stage partitioning with non-contiguous chunk assignment per rank - Proper micro-batch slicing via vector conversion with graceful fallback - Activation checkpoint recomputation from nearest earlier checkpoint - Virtual-stage-aware communication routing with unique tags Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/DistributedTraining/PipelineParallelModel.cs (1)
151-158:⚠️ Potential issue | 🔴 CriticalBlock invalid VirtualStagesPerRank values.
A schedule returning 0/negative will cause divide‑by‑zero in
InitializeShardingand invalid tag math. Validate upfront.As per coding guidelines "Production Readiness (CRITICAL - Flag as BLOCKING)... missing validation of external inputs".🛡️ Proposed fix
_virtualStagesPerRank = _schedule.VirtualStagesPerRank; + if (_virtualStagesPerRank < 1) + { + throw new InvalidOperationException("VirtualStagesPerRank must be at least 1."); + } _totalVirtualStages = _numStages * _virtualStagesPerRank;
🤖 Fix all issues with AI agents
In `@src/DistributedTraining/PipelineParallelModel.cs`:
- Around line 583-628: GetStageInput currently falls back to the original
micro-batch for non-first virtual stages on the same rank instead of using the
previous virtual stage's forward output; replace that fallback logic in
GetStageInput so that when virtualStageIndex > 0 and not receiving from a
previous rank you lookup the previous virtual stage's output from forwardOutputs
using the prior op key (opKey - 1 or the equivalent key construction used
elsewhere), convert that Vector<T> to TInput via
ConversionsHelper.ConvertVectorToInputWithoutReference<T, TInput>(...), and
return it; only if forwardOutputs does not contain the prior-stage output then
fall back to microBatches and otherwise throw the existing
InvalidOperationException.
- Around line 678-695: The ShouldCheckpointActivation method currently does an
opKey % _checkpointConfig.CheckpointEveryNLayers which can divide by zero; add a
guard that checks _checkpointConfig.CheckpointEveryNLayers > 0 before using the
modulo (e.g., if <= 0, treat checkpointing interval as disabled and return false
or surface a configuration error), updating ShouldCheckpointActivation to first
validate _checkpointConfig.CheckpointEveryNLayers and avoid the modulo when it
is non-positive.
- Around line 723-754: The nearest-checkpoint search in the recompute block can
pick checkpoints from other micro-batches because it only checks opKey against
_checkpointedActivations; change the search to restrict candidates to the
current micro-batch (e.g. start searchKey at microBatchIndex * V and only
consider keys in range microBatchIndex * V .. opKey-1) or refactor checkpoint
storage/lookup to use a composite key (microBatchIndex, virtualStageIndex) so
you only fetch a checkpoint from the same micro-batch; update the loop that sets
nearestCheckpointKey and the subsequent access of _checkpointedActivations to
use the new range or composite key check before converting checkpointVector and
running WrappedModel/Predict via ConversionsHelper.
- Around line 452-569: In SliceInputIntoMicroBatches and
SliceTargetIntoMicroBatches, stop silently duplicating data when conversion
fails or microBatchElements <= 0; instead throw a clear exception (e.g.,
ArgumentException or InvalidOperationException) indicating the provided
TInput/TOutput is not sliceable for the configured _microBatchSize (include
_microBatchSize and a brief context in the message). Replace the conversion
catch blocks and the microBatchElements <= 0 branches so they throw rather than
populate all slices, and ensure the exception type and message make it obvious
which method (SliceInputIntoMicroBatches or SliceTargetIntoMicroBatches) and
which parameter (input/target) caused the failure.
…pipeline parallelism - Add missing AiDotNet.Generators compile exclusion and ProjectReference to csproj (fixes CS0579 duplicate assembly attributes build error) - Add property setter validation in ActivationCheckpointConfig (CheckpointEveryNLayers >= 1, MaxActivationsInMemory >= 0) - Reorder GPipeSchedule validation to check numStages/numMicroBatches before stageId - Add _isAutoDetect flag and boundary validation to LoadBalancedPartitionStrategy - Add tag constants (ActivationTagBase, GradientTagBase, PredictTagBase) to prevent communication collisions in PipelineParallelModel - Add partition validation, checkpointing fail-fast, and internal property visibility Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ality issues - Reorder validation in all 7 schedule classes: check numStages/numMicroBatches before stageId - Fix integer overflow in EstimateBubbleFraction across all schedules (use long arithmetic) - Remove unused variables: forwardCount/backwardCount (Interleaved1F1B), isFirstLoop/isLastLoop (LoopedBFS), totalVirtualStages/totalWarmupForwards (ZeroBubbleV) - Remove redundant operations: / 1 (Interleaved1F1B), - 0 (ZeroBubbleH1) - Replace generic catch clauses with specific InvalidOperationException in PipelineParallelModel - Combine nested if statements in GetStageInput - Remove unused globalVirtualStageId variable - Use ternary operator for cost estimation in LoadBalancedPartitionStrategy Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…t micro-batch slicing - Split ConfigureDistributedTraining (7 params) into ConfigureDistributedTraining (3 params) + ConfigurePipelineParallelism (4 params) to avoid breaking the interface - Fix virtual-stage routing: non-first virtual stages now use forward output from the previous virtual stage instead of falling back to raw micro-batch input - Fail fast on micro-batch slicing failures instead of silently duplicating data to all micro-batches (which produces incorrect gradient averages) - Apply partition strategy for multi-stage (V>1) schedules instead of ignoring it - Limit checkpoint recompute search to current micro-batch boundaries Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…accuracy - Validate schedule output bounds (MicroBatchIndex, VirtualStageIndex) before executing ops to guard against externally injectable schedules - Integrate CheckpointFirstLayer config into ShouldCheckpointActivation - Add defensive guard for CheckpointEveryNLayers modulo-by-zero - Fix cost estimator doc to correctly explain paramCount^1.5 derivation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
src/DistributedTraining/PipelineParallelModel.cs:1
- The variable 'leftover' should be renamed to 'remainder' to match the naming convention used in the multi-stage block at line 226 and maintain consistency throughout the codebase.
using AiDotNet.Interfaces;
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Fix all issues with AI agents
In `@src/AiDotNet.csproj`:
- Around line 142-146: The project currently enables EmitCompilerGeneratedFiles
and sets CompilerGeneratedFilesOutputPath unconditionally; move these entries
into a configuration-gated PropertyGroup (e.g. PropertyGroup with
Condition="'$(Configuration)' == 'Debug'") or wrap them behind a dedicated
MSBuild property so they only apply in Debug (or when the property is explicitly
enabled), referencing the existing EmitCompilerGeneratedFiles and
CompilerGeneratedFilesOutputPath property names and the PropertyGroup element to
locate where to change.
In `@src/AiModelBuilder.cs`:
- Around line 3738-3796: The public ConfigurePipelineParallelism method
currently accepts non-positive microBatchSize which breaks downstream pipeline
scheduling; update ConfigurePipelineParallelism to validate microBatchSize > 0
and fail fast by throwing an ArgumentOutOfRangeException (or ArgumentException)
with a clear message when invalid, before assigning to _pipelineMicroBatchSize;
reference the method name ConfigurePipelineParallelism, parameter
microBatchSize, and field _pipelineMicroBatchSize when making the change.
In `@src/DistributedTraining/ActivationCheckpointConfig.cs`:
- Around line 56-105: The ArgumentOutOfRangeException calls inside the
CheckpointEveryNLayers setter and the MaxActivationsInMemory setter use
nameof(value) which yields "value"; change those to use the property names
instead (replace nameof(value) with nameof(CheckpointEveryNLayers) in the
CheckpointEveryNLayers setter and with nameof(MaxActivationsInMemory) in the
MaxActivationsInMemory setter) so the thrown exceptions reference the actual
property names and match .NET patterns.
In `@src/DistributedTraining/PipelineParallelModel.cs`:
- Around line 811-844: The recomputation block guarded by
_checkpointConfig.Enabled and _checkpointConfig.RecomputeStrategy !=
RecomputeStrategy.None (which uses _checkpointedActivations, ConversionsHelper,
WrappedModel.Predict, etc.) is dead because the PipelineParallelModel
constructor throws NotImplementedException for any RecomputeStrategy other than
None; either remove this entire block to avoid maintaining unreachable code or
retain it but add a clear comment/TODO above it explaining it is
placeholder/infrastructure for future recompute support and currently
unreachable due to the constructor guard so reviewers won’t treat it as active
logic.
- Around line 892-911: AccumulateGradients currently assumes accumulated.Length
== newGradients.Length; add a defensive check at the start of the method (after
the accumulated null check) that validates newGradients is non-null and that
accumulated.Length == newGradients.Length, and if not throw an ArgumentException
(or ArgumentNullException if newGradients is null) with a clear message
containing the two lengths and parameter names; keep the existing clone and
element-wise addition logic in AccumulateGradients unchanged.
- Around line 125-128: EstimatedBubbleFraction currently calls
_schedule.EstimateBubbleFraction(_numStages, _microBatchSize) but _numStages is
only set in OnBeforeInitializeSharding/InitializeSharding, so access before
initialization yields invalid results; update the EstimatedBubbleFraction getter
to guard against uninitialized state by checking _numStages (or an
initialization flag set in OnBeforeInitializeSharding/InitializeSharding) and
either throw a clear InvalidOperationException or return a safe default (e.g.,
0) when not initialized, referencing EstimatedBubbleFraction, _numStages,
OnBeforeInitializeSharding/InitializeSharding and
_schedule.EstimateBubbleFraction so reviewers can locate and apply the fix.
- Around line 206-234: The multi-stage branch that uses _partitionStrategy and
vsPartitions currently only checks count; add the same per-partition bounds
validation used in the single-stage path: for each entry returned by
ComputePartition(totalParams, _totalVirtualStages) verify StartIndex >= 0, Size
>= 0, and StartIndex + Size <= totalParams, and throw an
InvalidOperationException (including which partition index and offending values)
if any check fails so you catch out-of-range or negative partitions before they
are used by Array.Copy or later logic.
In `@src/DistributedTraining/ZeroBubbleH2Schedule.cs`:
- Around line 34-79: GetSchedule validates stageId but never uses it, producing
identical schedules for all stages; change the logic to make the schedule
stage-local by using stageId to decide which global timesteps this stage
actually executes: compute global forward timesteps and only add a
PipelineOperation.Forward when the global forward counter maps to this stage
(e.g., globalForwardIndex % numStages == stageId), set MicroBatchIndex using
that globalForwardIndex-to-microbatch mapping, and similarly for backward passes
use a globalBackwardIndex where you add a PipelineOperation.Backward only when
(globalBackwardIndex % numStages == stageId) (or the equivalent reverse-stage
mapping used by your other schedulers); adjust
numWarmupForwards/forwardIdx/steadyStateCount/backwardInputIdx/backwardWeightIdx
calculations so they operate on global counters and derive per-stage counts by
modular filtering, and ensure IsWarmup/IsCooldown flags are computed from the
global timeline so later stages don’t run work before inputs are available
(update GetSchedule, numWarmupForwards, forwardIdx, steadyStateCount,
backwardInputIdx, backwardWeightIdx and the PipelineOperation creation sites).
In `@src/DistributedTraining/ZeroBubbleVSchedule.cs`:
- Around line 185-209: The BackwardWeight ops are being marked IsCooldown = true
unconditionally; compute the isCooldown flag once per loop iteration like you do
for BackwardInput and reuse it for both BackwardWeight branches. Update the code
that builds PipelineOperation for BackwardWeight (the blocks that set Type =
PipelineOperationType.BackwardWeight and touch
MicroBatchIndex/backwardWeightCount0/backwardWeightCount1/VirtualStageIndex) to
set IsCooldown = isCooldown (the same local boolean computed earlier in the
loop) instead of hardcoding true, and ensure the increment of
backwardWeightCount0/backwardWeightCount1 remains unchanged.
In `@src/Interfaces/IAiModelBuilder.cs`:
- Line 789: Rename the ambiguous parameter microBatchSize to microBatchCount in
the IAiModelBuilder interface and any related method signatures (e.g.,
IAiModelBuilder.BuildModel / methods referencing microBatchSize) and update the
XML doc comment from "Number of micro-batches" to explicitly "Number of
micro-batches (micro-batch count for pipeline execution)". Ensure all usages,
XML <param> tags, and any implementing classes or callers are updated to use
microBatchCount to keep names consistent.
- AiModelBuilder: validate microBatchCount > 0, rename from microBatchSize - IAiModelBuilder: rename microBatchSize to microBatchCount for clarity - PipelineParallelModel: add multi-stage partition bounds validation, pre-init guard on EstimatedBubbleFraction, gradient length check, comment on unreachable recompute code, debug log for forced partition - ZeroBubbleH2Schedule: make warmup count stage-dependent using stageId - ZeroBubbleVSchedule: fix BackwardWeight IsCooldown to use computed flag instead of hardcoded true - ActivationCheckpointConfig: use property names in exceptions - AiDotNet.csproj: gate EmitCompilerGeneratedFiles to Debug config only Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… tfm mismatch - Remove duplicate source generator sections from merge with master - Add SetTargetFramework=netstandard2.0 to generator ProjectReference to prevent MSBuild from building it for net10.0/net471 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…/github.com/ooples/AiDotNet into feat/pipeline-parallelism-optimizations-463
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| { | ||
| if (numStages <= 0) | ||
| { | ||
| throw new ArgumentException("Number of stages must be positive.", nameof(numStages)); | ||
| } | ||
|
|
||
| if (numMicroBatches <= 0) | ||
| { | ||
| throw new ArgumentException("Number of micro-batches must be positive.", nameof(numMicroBatches)); | ||
| } | ||
|
|
||
| if (stageId < 0 || stageId >= numStages) | ||
| { | ||
| throw new ArgumentOutOfRangeException(nameof(stageId), | ||
| $"Stage ID must be between 0 and {numStages - 1}."); | ||
| } | ||
|
|
||
| var ops = new List<PipelineOperation>(); | ||
|
|
||
| // ZB-H1 follows 1F1B structure but splits backward into B + W | ||
| // Key constraint: maintain same number of in-flight micro-batches as 1F1B | ||
| // (i.e., at most numStages micro-batches stored at once) | ||
|
|
||
| int numWarmupForwards = Math.Min(numStages - 1 - stageId, numMicroBatches); | ||
| int numSteadyState = Math.Max(0, numMicroBatches - numWarmupForwards); | ||
|
|
||
| // Phase 1: Warmup - forward passes only (same as 1F1B) | ||
| int forwardIdx = 0; | ||
| for (int i = 0; i < numWarmupForwards; i++) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardIdx, | ||
| IsWarmup = true, | ||
| IsCooldown = false | ||
| }); | ||
| forwardIdx++; | ||
| } | ||
|
|
||
| // Phase 2: Steady state - 1F-1B-1W pattern | ||
| // For each steady-state step: one Forward, one BackwardInput, and | ||
| // schedule BackwardWeight for the micro-batch that completed B earliest. | ||
| int backwardInputIdx = 0; | ||
| int backwardWeightIdx = 0; | ||
|
|
||
| for (int i = 0; i < numSteadyState; i++) | ||
| { | ||
| // Forward | ||
| if (forwardIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| forwardIdx++; | ||
| } | ||
|
|
||
| // BackwardInput (B) - on the critical path | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardInput, | ||
| MicroBatchIndex = backwardInputIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| backwardInputIdx++; | ||
|
|
||
| // BackwardWeight (W) - fills bubbles, scheduled for earlier micro-batch | ||
| // ZB-H1 constraint: W starts only after enough B steps to maintain | ||
| // the same in-flight count as 1F1B | ||
| if (backwardWeightIdx < backwardInputIdx && backwardWeightIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| backwardWeightIdx++; | ||
| } | ||
| } | ||
|
|
||
| // Phase 3: Cooldown - remaining B and W passes | ||
| while (backwardInputIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardInput, | ||
| MicroBatchIndex = backwardInputIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardInputIdx++; | ||
| } | ||
|
|
||
| // Drain remaining W passes | ||
| while (backwardWeightIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardWeightIdx++; | ||
| } | ||
|
|
||
| return ops; | ||
| } |
Check notice
Code scanning / CodeQL
Block with too many statements Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 hour ago
In general, to fix “block with too many statements” issues, you extract coherent subsets of logic into well-named helper methods. This reduces the number of statements and “complex” constructs in any single block, making the code easier to understand and maintain without changing observable behavior.
For this method, the best approach is to keep GetSchedule as a high-level orchestrator and move each major phase into private helpers:
- Keep argument validation in
GetSchedule. - Compute
numWarmupForwards,numSteadyState, initialize indexes and theopslist inGetSchedule. - Extract:
- Phase 1 warmup loop into
AddWarmupForwards. - Phase 2 steady-state 1F‑1B‑1W loop into
AddSteadyStateOps. - Phase 3 cooldown B and W draining loops into
AddCooldownOps.
- Phase 1 warmup loop into
Each helper will accept the list and the relevant counters by reference (or via ref for the indices) and append operations. This preserves the behavior exactly while reducing the number of complex statements directly in the GetSchedule body.
Concretely, in src/DistributedTraining/ZeroBubbleH1Schedule.cs:
- Add three new
private statichelper methods insideZeroBubbleH1Schedule:AddWarmupForwards(List<PipelineOperation> ops, int numWarmupForwards, ref int forwardIdx);AddSteadyStateOps(List<PipelineOperation> ops, int numSteadyState, int numMicroBatches, ref int forwardIdx, ref int backwardInputIdx, ref int backwardWeightIdx);AddCooldownOps(List<PipelineOperation> ops, int numMicroBatches, ref int backwardInputIdx, ref int backwardWeightIdx);
- Replace the inlined loops in
GetSchedulewith calls to these methods. - No new imports or external dependencies are needed; everything uses existing types (
List<PipelineOperation>, etc.).
This reduces the number of complex statements in GetSchedule while keeping behavior and public API unchanged.
| { | ||
| if (numStages <= 0) | ||
| { | ||
| throw new ArgumentException("Number of stages must be positive.", nameof(numStages)); | ||
| } | ||
|
|
||
| if (numMicroBatches <= 0) | ||
| { | ||
| throw new ArgumentException("Number of micro-batches must be positive.", nameof(numMicroBatches)); | ||
| } | ||
|
|
||
| if (stageId < 0 || stageId >= numStages) | ||
| { | ||
| throw new ArgumentOutOfRangeException(nameof(stageId), | ||
| $"Stage ID must be between 0 and {numStages - 1}."); | ||
| } | ||
|
|
||
| var ops = new List<PipelineOperation>(); | ||
|
|
||
| // ZB-H2 allows more warmup forwards than 1F1B to fill the pipeline more aggressively. | ||
| // The key difference from ZB-H1: we allow up to (numStages - 1) additional in-flight | ||
| // micro-batches, which uses more memory but fills all bubbles. | ||
|
|
||
| // Extended warmup: later stages (higher stageId) get fewer warmup forwards | ||
| // because their inputs arrive later in the pipeline. | ||
| // Stage 0 gets up to numStages warmup forwards, stage (numStages-1) gets 1. | ||
| int numWarmupForwards = Math.Min(numStages - stageId, numMicroBatches); | ||
|
|
||
| // Phase 1: Extended warmup - more forward passes to fill pipeline completely | ||
| int forwardIdx = 0; | ||
| for (int i = 0; i < numWarmupForwards; i++) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardIdx, | ||
| IsWarmup = true, | ||
| IsCooldown = false | ||
| }); | ||
| forwardIdx++; | ||
| } | ||
|
|
||
| // Phase 2: Steady state - interleave F, B, W to maintain zero bubble | ||
| int backwardInputIdx = 0; | ||
| int backwardWeightIdx = 0; | ||
| int steadyStateCount = Math.Max(0, numMicroBatches - numWarmupForwards); | ||
|
|
||
| for (int i = 0; i < steadyStateCount; i++) | ||
| { | ||
| // BackwardInput (B) first - critical path | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardInput, | ||
| MicroBatchIndex = backwardInputIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| backwardInputIdx++; | ||
|
|
||
| // Forward for next micro-batch | ||
| if (forwardIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| forwardIdx++; | ||
| } | ||
|
|
||
| // BackwardWeight (W) - fills any remaining time | ||
| if (backwardWeightIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| backwardWeightIdx++; | ||
| } | ||
| } | ||
|
|
||
| // Phase 3: Cooldown - drain remaining B and W | ||
| while (backwardInputIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardInput, | ||
| MicroBatchIndex = backwardInputIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardInputIdx++; | ||
|
|
||
| // Interleave W during cooldown | ||
| if (backwardWeightIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardWeightIdx++; | ||
| } | ||
| } | ||
|
|
||
| // Final W drain | ||
| while (backwardWeightIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightIdx, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardWeightIdx++; | ||
| } | ||
|
|
||
| return ops; | ||
| } |
Check notice
Code scanning / CodeQL
Block with too many statements Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 hour ago
In general, to fix a “block with too many statements” issue, you extract logically distinct parts of the method into smaller, well‑named helper methods. The top‑level method then becomes a simple orchestrator that validates input, initializes shared state, and delegates to helpers. This reduces the number of complex statements (loops/conditionals) in the flagged block while preserving functionality.
For this specific case, the best approach is to keep GetSchedule as the orchestration entry point and move each conceptual phase into its own private method:
- Keep the initial argument validation in
GetSchedule(it’s simple and clear). - After creating
ops, computenumWarmupForwards, initialize the index variables, and then:- Call a new
AddWarmupForwardshelper to implement “Phase 1: Extended warmup”. - Call a new
AddSteadyStateOperationshelper to implement “Phase 2: Steady state”. - Call a new
AddCooldownOperationshelper to implement “Phase 3: Cooldown”. - Call a new
AddFinalWeightDrainhelper to implement the final W drain.
- Call a new
- Each helper receives the
opslist, relevant counts/indices by reference where needed, and performs the corresponding loop/conditionals internally.
This keeps behavior identical while reducing the number of loops and conditional blocks directly inside GetSchedule. No new external dependencies are required; we only add private methods within ZeroBubbleH2Schedule in the same file. All changes are confined to src/DistributedTraining/ZeroBubbleH2Schedule.cs and within the shown class.
| { | ||
| if (numStages <= 0) | ||
| { | ||
| throw new ArgumentException("Number of stages must be positive.", nameof(numStages)); | ||
| } | ||
|
|
||
| if (numMicroBatches <= 0) | ||
| { | ||
| throw new ArgumentException("Number of micro-batches must be positive.", nameof(numMicroBatches)); | ||
| } | ||
|
|
||
| if (stageId < 0 || stageId >= numStages) | ||
| { | ||
| throw new ArgumentOutOfRangeException(nameof(stageId), | ||
| $"Stage ID must be between 0 and {numStages - 1}."); | ||
| } | ||
|
|
||
| var ops = new List<PipelineOperation>(); | ||
|
|
||
| // ZB-V uses exactly 2 virtual stages per rank (V=2). | ||
| // Virtual stage IDs for rank stageId: stageId (chunk 0) and stageId + numStages (chunk 1). | ||
| // | ||
| // The schedule interleaves F/B/W operations across both virtual stages: | ||
| // - Forward on virtual stage 0 (chunk 0) | ||
| // - Forward on virtual stage 1 (chunk 1) | ||
| // - BackwardInput on virtual stage 1 (chunk 1, reverse order) | ||
| // - BackwardInput on virtual stage 0 (chunk 0, reverse order) | ||
| // - BackwardWeight fills any remaining gaps | ||
|
|
||
| // Warmup: forwards across both virtual stages | ||
| // Number of warmup forwards scales with position in pipeline | ||
| int warmupForwardsPerChunk = Math.Min(numStages - 1 - stageId, numMicroBatches); | ||
|
|
||
| int forwardCount0 = 0; // Forward count for virtual stage 0 | ||
| int forwardCount1 = 0; // Forward count for virtual stage 1 | ||
| int backwardInputCount0 = 0; | ||
| int backwardInputCount1 = 0; | ||
| int backwardWeightCount0 = 0; | ||
| int backwardWeightCount1 = 0; | ||
|
|
||
| // Phase 1: Warmup - interleaved forwards across both virtual stages | ||
| // Depth-first: complete a microbatch through both chunks before starting next | ||
| for (int i = 0; i < warmupForwardsPerChunk && forwardCount0 < numMicroBatches; i++) | ||
| { | ||
| // Forward on chunk 0 | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardCount0, | ||
| VirtualStageIndex = 0, | ||
| IsWarmup = true, | ||
| IsCooldown = false | ||
| }); | ||
| forwardCount0++; | ||
|
|
||
| // Forward on chunk 1 for the same microbatch (if chunk 0 output is ready) | ||
| if (forwardCount1 < forwardCount0 && forwardCount1 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardCount1, | ||
| VirtualStageIndex = 1, | ||
| IsWarmup = true, | ||
| IsCooldown = false | ||
| }); | ||
| forwardCount1++; | ||
| } | ||
| } | ||
|
|
||
| // Phase 2: Steady state - F0, F1, B1, B0, W interleaving | ||
| // Continue until all forwards and backwards are complete | ||
| while (forwardCount0 < numMicroBatches || | ||
| forwardCount1 < numMicroBatches || | ||
| backwardInputCount0 < numMicroBatches || | ||
| backwardInputCount1 < numMicroBatches) | ||
| { | ||
| bool isCooldown = forwardCount0 >= numMicroBatches && forwardCount1 >= numMicroBatches; | ||
|
|
||
| // Forward on chunk 0 (if available) | ||
| if (forwardCount0 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardCount0, | ||
| VirtualStageIndex = 0, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| forwardCount0++; | ||
| } | ||
|
|
||
| // Forward on chunk 1 (if chunk 0 has produced output for this microbatch) | ||
| if (forwardCount1 < forwardCount0 && forwardCount1 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardCount1, | ||
| VirtualStageIndex = 1, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| forwardCount1++; | ||
| } | ||
|
|
||
| // BackwardInput on chunk 1 (reverse order - B step, critical path) | ||
| if (backwardInputCount1 < forwardCount1 && backwardInputCount1 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardInput, | ||
| MicroBatchIndex = backwardInputCount1, | ||
| VirtualStageIndex = 1, | ||
| IsWarmup = false, | ||
| IsCooldown = isCooldown | ||
| }); | ||
| backwardInputCount1++; | ||
| } | ||
|
|
||
| // BackwardInput on chunk 0 (after chunk 1's B is done for this microbatch) | ||
| if (backwardInputCount0 < backwardInputCount1 && backwardInputCount0 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardInput, | ||
| MicroBatchIndex = backwardInputCount0, | ||
| VirtualStageIndex = 0, | ||
| IsWarmup = false, | ||
| IsCooldown = isCooldown | ||
| }); | ||
| backwardInputCount0++; | ||
| } | ||
|
|
||
| // BackwardWeight (W) - fills bubbles, process whichever chunk has pending W | ||
| if (backwardWeightCount1 < backwardInputCount1 && backwardWeightCount1 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightCount1, | ||
| VirtualStageIndex = 1, | ||
| IsWarmup = false, | ||
| IsCooldown = isCooldown | ||
| }); | ||
| backwardWeightCount1++; | ||
| } | ||
|
|
||
| if (backwardWeightCount0 < backwardInputCount0 && backwardWeightCount0 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightCount0, | ||
| VirtualStageIndex = 0, | ||
| IsWarmup = false, | ||
| IsCooldown = isCooldown | ||
| }); | ||
| backwardWeightCount0++; | ||
| } | ||
| } | ||
|
|
||
| // Phase 3: Drain remaining BackwardWeight operations | ||
| while (backwardWeightCount1 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightCount1, | ||
| VirtualStageIndex = 1, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardWeightCount1++; | ||
| } | ||
|
|
||
| while (backwardWeightCount0 < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.BackwardWeight, | ||
| MicroBatchIndex = backwardWeightCount0, | ||
| VirtualStageIndex = 0, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardWeightCount0++; | ||
| } | ||
|
|
||
| return ops; | ||
| } |
Check notice
Code scanning / CodeQL
Block with too many statements Note
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 hour ago
In general, to fix a "block with too many statements" warning, you factor out logically distinct pieces of the method into well-named helper methods. This reduces the number of complex statements (loops, conditionals) in the original block and makes each method shorter and easier to understand, without changing behavior.
For GetSchedule, the single best fix is to keep argument validation and simple setup in GetSchedule, and move the three conceptual phases of the schedule generation into private helper methods:
AddWarmupPhase(...)for the initial interleaved forward passes.AddSteadyStatePhase(...)for the F0/F1/B1/B0/W loop.AddDrainPhase(...)for draining remainingBackwardWeightoperations.
Each helper should accept the ops list and the counter variables by ref so the state is preserved exactly as before. This lets us move each for/while loop and its associated conditionals into its own method, cutting down complex statements within GetSchedule itself, while keeping behavior identical.
Concretely:
- In
ZeroBubbleVSchedule.GetSchedule(around lines 79–239), replace the inline loops for phases 1–3 with three calls to new private methods, passing the current state. - Add three new
private staticmethods insideZeroBubbleVSchedule, belowGetSchedule(or aboveEstimateBubbleFraction), containing the existing loop bodies with minimal modification, using the passed-in parameters andrefcounters. - No new imports or external dependencies are needed; we only rearrange existing logic.
- Use long variables in EstimateBubbleFraction across all 6 schedule classes to prevent integer overflow in numerator arithmetic - Increase communication tag ranges from 100K to 1M between bases to prevent collisions with many micro-batches and virtual stages Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Disable shared compilation (-p:UseSharedCompilation=false) in the SonarCloud analysis build step to prevent VBCSCompiler from holding file locks on AiDotNet.Generators.dll during parallel project builds. Also use long arithmetic in bubble fraction calculations and widen communication tag ranges from 100K to 1M to prevent collisions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
src/DistributedTraining/PipelineParallelModel.cs:1068
Deserialize()callsInitializeSharding()directly, butPipelineParallelModel.InitializeSharding()depends on fields initialized inOnBeforeInitializeSharding()(e.g.,_stageId,_numStages,_virtualStagesPerRank). BecauseEnsureShardingInitialized()is bypassed here, deserialization can leave these fields at defaults and produce incorrect partitioning (or throw) depending on whether sharding had previously been initialized.
Suggested fix: reset the sharding init state and call EnsureShardingInitialized() (or explicitly call OnBeforeInitializeSharding() before InitializeSharding()), so derived-class pre-init always runs during deserialization.
public override void Deserialize(byte[] data)
{
using var ms = new MemoryStream(data);
using var reader = new BinaryReader(ms);
int savedWorldSize = reader.ReadInt32();
int savedRank = reader.ReadInt32();
int savedMicroBatchSize = reader.ReadInt32();
reader.ReadBoolean(); // AutoSyncGradients
reader.ReadInt32(); // MinimumParameterGroupSize
reader.ReadBoolean(); // EnableGradientCompression
reader.ReadString(); // Schedule name (informational)
reader.ReadBoolean(); // Checkpointing enabled
reader.ReadInt32(); // CheckpointEveryNLayers
reader.ReadInt32(); // VirtualStagesPerRank (informational)
if (savedWorldSize != WorldSize)
throw new InvalidOperationException($"World size mismatch: {savedWorldSize} vs {WorldSize}");
if (savedRank != Rank)
throw new InvalidOperationException($"Rank mismatch: {savedRank} vs {Rank}");
if (savedMicroBatchSize != _microBatchSize)
throw new InvalidOperationException($"Micro batch size mismatch: saved model was trained with {savedMicroBatchSize}, but current instance configured with {_microBatchSize}");
int modelDataLength = reader.ReadInt32();
byte[] modelData = reader.ReadBytes(modelDataLength);
WrappedModel.Deserialize(modelData);
InitializeSharding();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// - Selective: Only recompute activations that are needed and not checkpointed (recommended) | ||
| /// - Full: Recompute all non-checkpointed activations from the previous checkpoint | ||
| /// - None: Don't recompute, equivalent to no checkpointing (for testing/debugging) | ||
| /// </para> | ||
| /// </remarks> | ||
| public RecomputeStrategy RecomputeStrategy { get; set; } = RecomputeStrategy.Selective; |
There was a problem hiding this comment.
ActivationCheckpointConfig.RecomputeStrategy defaults to Selective, but PipelineParallelModel currently throws NotImplementedException when checkpointing is enabled with any strategy other than None. This makes the documented/recommended default unusable as soon as a caller sets Enabled = true without also overriding RecomputeStrategy.
Suggested fix: change the default RecomputeStrategy to None until recomputation is implemented, or remove/relax the constructor guard and make Selective/Full map to the currently-supported behavior.
| /// - Selective: Only recompute activations that are needed and not checkpointed (recommended) | |
| /// - Full: Recompute all non-checkpointed activations from the previous checkpoint | |
| /// - None: Don't recompute, equivalent to no checkpointing (for testing/debugging) | |
| /// </para> | |
| /// </remarks> | |
| public RecomputeStrategy RecomputeStrategy { get; set; } = RecomputeStrategy.Selective; | |
| /// - Selective: Only recompute activations that are needed and not checkpointed | |
| /// - Full: Recompute all non-checkpointed activations from the previous checkpoint | |
| /// - None: Don't recompute, equivalent to no checkpointing (currently the default and | |
| /// the only strategy guaranteed to be supported) | |
| /// </para> | |
| /// </remarks> | |
| public RecomputeStrategy RecomputeStrategy { get; set; } = RecomputeStrategy.None; |
| if (layerBoundaries is null || layerBoundaries.Length == 0) | ||
| { | ||
| throw new ArgumentException("Layer boundaries must be provided and non-empty.", nameof(layerBoundaries)); | ||
| } | ||
|
|
||
| // Validate all boundaries are non-negative and strictly increasing | ||
| if (layerBoundaries[0] < 0) | ||
| { | ||
| throw new ArgumentException( | ||
| $"Layer boundary at index 0 is negative ({layerBoundaries[0]}). All boundaries must be non-negative.", | ||
| nameof(layerBoundaries)); | ||
| } |
There was a problem hiding this comment.
The explicit-boundaries constructor validates boundaries are non-negative and strictly increasing, but it does not enforce that the first boundary is 0. If layerBoundaries[0] is > 0, BuildLayerSizes() will ignore parameters [0..layerBoundaries[0]-1], causing ComputePartition() to return partitions that don't cover the full parameter vector.
Suggested fix: require layerBoundaries[0] == 0 (or explicitly handle a leading implicit boundary at 0).
| for (int vStage = 0; vStage < _virtualStagesPerRank; vStage++) | ||
| { | ||
| // Within each loop, apply 1F1B scheduling for this virtual stage | ||
| int numWarmupForwards = Math.Min(numStages - 1 - stageId, numMicroBatches); | ||
| int numSteadyState = Math.Max(0, numMicroBatches - numWarmupForwards); | ||
|
|
||
| // Phase 1: Warmup - forward passes only | ||
| int forwardIdx = 0; | ||
| for (int i = 0; i < numWarmupForwards; i++) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardIdx, | ||
| VirtualStageIndex = vStage, | ||
| IsWarmup = true, | ||
| IsCooldown = false | ||
| }); | ||
| forwardIdx++; | ||
| } | ||
|
|
||
| // Phase 2: Steady state - alternating 1F1B | ||
| int backwardIdx = 0; | ||
| for (int i = 0; i < numSteadyState; i++) | ||
| { | ||
| // Forward | ||
| if (forwardIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Forward, | ||
| MicroBatchIndex = forwardIdx, | ||
| VirtualStageIndex = vStage, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| forwardIdx++; | ||
| } | ||
|
|
||
| // Backward | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Backward, | ||
| MicroBatchIndex = backwardIdx, | ||
| VirtualStageIndex = vStage, | ||
| IsWarmup = false, | ||
| IsCooldown = false | ||
| }); | ||
| backwardIdx++; | ||
| } | ||
|
|
||
| // Phase 3: Cooldown - remaining backward passes | ||
| while (backwardIdx < numMicroBatches) | ||
| { | ||
| ops.Add(new PipelineOperation | ||
| { | ||
| Type = PipelineOperationType.Backward, | ||
| MicroBatchIndex = backwardIdx, | ||
| VirtualStageIndex = vStage, | ||
| IsWarmup = false, | ||
| IsCooldown = true | ||
| }); | ||
| backwardIdx++; | ||
| } | ||
| } | ||
|
|
||
| return ops; |
There was a problem hiding this comment.
As written, this schedule runs a complete 1F1B loop for vStage=0 (including backward + cooldown) before emitting any forward ops for vStage=1. But PipelineParallelModel.GetStageInput() requires vStage>0 forwards to consume the forward output of vStage-1 for the same micro-batch. Those outputs will typically have been freed during the vStage=0 backward steps (via FreeNonCheckpointedActivations), so vStage=1 forward will throw InvalidOperationException (missing forward output) or force unbounded activation retention.
Suggested fix: emit operations in an order that respects per-microbatch virtual-stage dependencies (i.e., ensure Forward(m, vStage-1) happens and remains available until Forward(m, vStage) runs), or adjust the pipeline runtime so it retains the needed forward outputs across virtual-stage loops.
| // Pipeline parallel training using the configured schedule | ||
| var scheduleOps = _schedule.GetSchedule(_stageId, _numStages, _microBatchSize); | ||
|
|
||
| // Validate schedule output: externally injectable schedules may emit invalid indices | ||
| foreach (var op in scheduleOps) | ||
| { | ||
| if (op.MicroBatchIndex < 0 || op.MicroBatchIndex >= _microBatchSize) | ||
| { | ||
| throw new InvalidOperationException( | ||
| $"Schedule '{_schedule.Name}' emitted MicroBatchIndex={op.MicroBatchIndex} " + | ||
| $"but valid range is [0, {_microBatchSize - 1}]."); | ||
| } | ||
|
|
||
| if (op.VirtualStageIndex < 0 || op.VirtualStageIndex >= _virtualStagesPerRank) | ||
| { | ||
| throw new InvalidOperationException( | ||
| $"Schedule '{_schedule.Name}' emitted VirtualStageIndex={op.VirtualStageIndex} " + | ||
| $"but valid range is [0, {_virtualStagesPerRank - 1}]."); | ||
| } | ||
| } |
There was a problem hiding this comment.
New behavior here (schedule-driven execution, virtual stages, micro-batch slicing, B/W decomposition, checkpointing/partition-strategy integration) is not covered by the existing distributed-training tests, which currently only validate basic construction/metadata for PipelineParallelModel. Adding unit tests that validate:
GetSchedule()sequences for each schedule type- correct handling of
VirtualStageIndexdependencies - multi-microbatch slicing correctness
- backward compatibility for defaults
would reduce risk of regressions in these complex control-flow changes.
| // Extract this stage's parameter shard | ||
| var updatedParams = WrappedModel.GetParameters(); | ||
| UpdateLocalShardFromFull(updatedParams); | ||
| InvalidateCache(); |
There was a problem hiding this comment.
At the end of training, the model calls UpdateLocalShardFromFull(updatedParams), which assumes the local shard is a single contiguous slice (ShardStartIndex..ShardStartIndex+ShardSize). For multi-stage schedules this class explicitly constructs the local shard as a concatenation of multiple non-contiguous chunks, so this update will extract the wrong parameters and corrupt LocalShard.
Suggested fix: override the shard-update path for VirtualStagesPerRank > 1 to rebuild LocalShard by copying each virtual-stage partition into the correct offsets (mirroring the extraction logic in InitializeSharding()).
Apply -p:UseSharedCompilation=false to both Build (Windows) and SonarCloud Analysis build steps. VBCSCompiler holds file locks on AiDotNet.Generators.dll when building the solution, causing CS2012 errors when multiple projects compile the generator concurrently. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| } | ||
|
|
||
| // Fallback estimate for small M | ||
| return (double)((long)numStages - numMicroBatches) / (3L * numMicroBatches + numStages); |
Check failure
Code scanning / CodeQL
Possible loss of precision Error
Copilot Autofix
AI about 1 hour ago
Copilot could not generate an autofix suggestion
Copilot could not generate an autofix suggestion for this alert. Try pushing a new commit or if the problem persists contact support.
|
|
||
| // For insufficient micro-batches, small residual bubble | ||
| // With V=2 virtual stages, the bubble is reduced compared to ZB-H1 | ||
| return (double)((long)numStages - numMicroBatches) / (6L * numMicroBatches + numStages); |
Check failure
Code scanning / CodeQL
Possible loss of precision Error
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 1 hour ago
In general, to avoid overflow when converting the result of an integer multiplication to a floating‑point value, perform the multiplication in floating‑point space instead of integer space. This is achieved by casting one of the operands to a floating‑point type before the arithmetic, ensuring the entire expression is evaluated in floating‑point and cannot overflow in the intermediate integer step.
In this specific case, we should change the denominator of the fraction in EstimateBubbleFraction so that it is evaluated as double arithmetic from the start. The numerator is already explicitly cast to double, so it is safe. For the denominator, replacing 6L * numMicroBatches + numStages with 6.0 * numMicroBatches + numStages (or explicitly casting numMicroBatches to double) ensures that the multiplication is done in double rather than long, eliminating the potential for integer overflow. This keeps the mathematical formula the same, but avoids the flagged pattern. No additional methods or imports are needed; we only adjust the literal type used in the denominator expression within ZeroBubbleVSchedule.EstimateBubbleFraction in src/DistributedTraining/ZeroBubbleVSchedule.cs.
| @@ -257,6 +257,6 @@ | ||
|
|
||
| // For insufficient micro-batches, small residual bubble | ||
| // With V=2 virtual stages, the bubble is reduced compared to ZB-H1 | ||
| return (double)((long)numStages - numMicroBatches) / (6L * numMicroBatches + numStages); | ||
| return (double)((long)numStages - numMicroBatches) / (6.0 * numMicroBatches + numStages); | ||
| } | ||
| } |
Summary
Implements the three production optimizations for pipeline parallel training described in issue #463:
LoadBalancedPartitionStrategyuses dynamic programming (min-max partitioning) to distribute computational cost evenly across stages, replacing the naive uniform parameter split. Supports custom cost estimators and automatic layer boundary detection.OneForwardOneBackwardScheduleinterleaves forward and backward passes to reduce pipeline bubble from ~50% (GPipe) to ~12-15%. Three phases: warmup, steady-state alternating 1F1B, and cooldown.ActivationCheckpointConfigenables trading compute for memory by only storing activations at checkpoint layers. Reduces memory from O(L) to O(sqrt(L)) with configurable checkpoint frequency and recompute strategies (Selective, Full, None).All three optimizations are integrated into
PipelineParallelModelwith full backward compatibility - the default constructor behavior is identical to before (uniform partition, GPipe schedule, no checkpointing).New files (7):
src/Interfaces/IPipelinePartitionStrategy.cs- Strategy interface for custom partitioningsrc/Interfaces/IPipelineSchedule.cs- Schedule interface + PipelineOperation/PipelineOperationTypesrc/DistributedTraining/UniformPartitionStrategy.cs- Default uniform partitioningsrc/DistributedTraining/LoadBalancedPartitionStrategy.cs- DP-based load balancingsrc/DistributedTraining/GPipeSchedule.cs- Standard all-forward-then-all-backward schedulesrc/DistributedTraining/OneForwardOneBackwardSchedule.cs- Interleaved 1F1B schedulesrc/DistributedTraining/ActivationCheckpointConfig.cs- Checkpoint config + RecomputeStrategy enumModified files (1):
src/DistributedTraining/PipelineParallelModel.cs- Integrated all optimizationsCloses #463
Test plan
🤖 Generated with Claude Code
Summary by CodeRabbit